Recommender Systems are a set of methods able to predict the 'rating' or 'preference' that a user would give to an item. Among the different approaches to design this kind of systems, in this lab session we are going to work with Collaborative Filtering (CF) approaches. However, unlike previous lab session, we are going to work with distributed implementations based on Spark.
Along the notebook we are going to use the dataset from MovieLens
. MovieLens data sets were collected by the GroupLens Research Project at the University of Minnesota. The original version of this problem contains 10 millions of ratings applied to 10681 movies by 71567 users. However, for this lab, we will use a reduced version consisting of 100,000 ratings (with values from 1 to 5) from 943 users on 1682 movies, where each user has rated, at least, 20 movies.
As you progress in this notebook, you will have to complete some exercises. Each exercise includes an explanation of what is expected, followed by code cells where one or several lines will contain <FILL IN>
. The cell that needs to be modified will have # TODO: Replace <FILL IN> with appropriate code
on its first line. Once the <FILL IN>
sections are updated, the code can be run; below this cell, you will find the test cell (beginning with the line # TEST CELL
) and you can run it to verify the correctness of your solution.
In [ ]:
# Import some libraries
import numpy as np
import math
from test_helper import Test
In [ ]:
# Define data file
ratingsFilename = 'u.data'
# Read data with spark
rawRatings = sc.textFile(ratingsFilename)
# Check file format
print rawRatings.take(10)
As you can checked, each line is formatted as: UserID \t MovieID \t Rating \t Timestamp \n. So, let's convert each line to a list with the fields [UserID, MovieID, Rating] (we drop the timestamp because we do not need it for this exercise).
In order to work in a distributed way, let's start implementing a function (format_rating) that let's convert each line into the desired format. Then, we can call this function from our RDD with a map method to apply it over each line.
Tip: Check the Python function split()
to convert each line into a list of items split by a given character.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
def format_ratings(line):
""" Parse a line in the ratings dataset
Args:
line (str): a line in the ratings dataset in the form of UserID \t MovieID \t Rating \t Timestamp \n
Returns:
tuple: (UserID, MovieID, Rating)
"""
# Divide each line with the character '\t'
items = # FILL
# Get UserID and convert it to int
user_id = # FILL
# Get ItemID and convert it to int
item_id = # FILL
# Get Reating and convert it to float
rating_id = # FILL
# Return UserID, ItemID and Rating.
return # FILL
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_line = u'196\t242\t3\t881250949'
check_tuple = format_ratings(check_line)
Test.assertEquals(check_tuple, (196, 242, 3), 'incorrect result: data are incorrectly formatted')
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Convert each line of rawRatings
ratingsRDD = # FILL IN
# Show the output
print ratingsRDD.take(10)
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(ratingsRDD.first(), (196, 242, 3), 'incorrect result: data are incorrectly formatted')
Now, to be able to train and evaluate the different methods, let's divide the rating matrix into two different matrix:
Hint: you can apply the randomSplit()
method of the RDD to divide it at random.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
trainingRDD, testRDD = ratingsRDD.randomSplit(#FILL IN, seed=0L)
print 'Training: %s, test: %s\n' % (trainingRDD.count(), testRDD.count())
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(trainingRDD.count(), 75008, 'incorrect result: number of training ratings is incorrect')
Test.assertEquals(testRDD.count(), 24992, 'incorrect result: number of test ratings is incorrect')
Test.assertEquals(trainingRDD.first(), (186, 302, 3.0), 'incorrect result: the values of the training RDD are incorrect')
Test.assertEquals(testRDD.first(), (196, 242, 3.0), 'incorrect result: the values of the testing RDD are incorrect')
In this section we are going to build a mean based baseline; that is, the recommender will predict new ratings as the average value of the ratings given by this user to previous rated items.
To design this approach, let's start building a function that, given a user_id and all its associated ratings, is able to compute the average value of all the ratings.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
def getAverages(IDandRatingsTuple):
""" Calculate average rating
Args:
IDandRatingsTuple: a single tuple of (ID_user, (Rating1, Rating2, Rating3, ...))
Returns:
tuple: a tuple of (ID_user, averageRating)
"""
id_user = # FILL IN
mean_value = # FILL IN
return (id_user, mean_value)
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_ratings = (0, iter(2, 5, 3, 1, 2))
check_output = getAverages(check_ratings)
Test.assertEquals(check_output, (0, 2.6), 'incorrect result: check_output is incorrect')
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (UserID, Rating), i.e, remove the MovieID field.
RDD_users_ratings = trainingRDD.# FILL IN
# From the RDD of (UserID, Rating) create an RDD with tuples of
# (UserID, iterable of Ratings for that UserID), where iterable of Ratings for that UserID has
# all the rated items of UserID. Review groupByKey() method of RDD elements.
RDD_users_allratings = RDD_users_ratings.# FILL IN
# Using getAverages(), compute the average rating of each user.
RDD_users_mean = RDD_users_allratings.# FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(RDD_users_ratings.first(), (186, 3.0), 'incorrect result: RDD_users_ratings is incorrect')
Test.assertEquals(list(RDD_users_allratings.first()[1])[:5], [4.0, 5.0, 4.0, 3.0, 3.0], 'incorrect result: RDD_users_allratings is incorrect')
Test.assertEquals(np.round(RDD_users_mean.first()[1],2), 3.69, 'incorrect result: RDD_users_mean is incorrect')
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Create a new RDD, RDD_test_ids, consisting of (UserID, MovieID) pairs
# that you extract from testRDD. That is, remove the filed Rating from testRDD.
RDD_test_ids = testRDD.# FILL IN
# Using the user_id as key, join RDD_test_ids with RDD_users_mean.
# Review the method leftOuterJoin() of RRD elements.
RDD_test_ids_mean = RDD_test_ids.# FILL IN
# Note that the resulting RRD provided by leftOuterJoin() method has the format
# (Iduser, (IdItem, PredRating)). Remap it to create a RDD with tuples (Iduser, IdItem, PredRating)
RDD_pred_mean = RDD_test_ids_mean.# FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(RDD_test_ids.first(), (196, 242), 'incorrect result: RDD_test_ids is incorrect')
Test.assertEquals(RDD_test_ids_mean.first(), (512, (23, 4.294117647058823)), 'incorrect result: RDD_test_ids_mean is incorrect')
Test.assertEquals(RDD_pred_mean.first(), (512, 23, 4.294117647058823), 'incorrect result: RDD_pred_mean is incorrect')
Finally, let's evaluate the goodness of the computed predictions over the test data. To evaluate it, we are going to use two measurements:
The next cell contains a function that given two RDDs, the first with the predicted ratings and the second with the real rating values, is able to compute the RMSE value. Use it as example to create a new function able to calculate the MAE value.
In [ ]:
def get_RMSE(predictedRDD, actualRDD):
""" Compute the root mean squared error between two RDD with the predicted and actual ratings
Args:
predictedRDD: predicted ratings for each movie and each user where each entry is in the form
(UserID, MovieID, Rating)
actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
Returns:
RSME (float): computed RSME value
"""
# Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
predictedReformattedRDD = predictedRDD.map(lambda x: ((x[0],x[1]),x[2]))
# Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
actualReformattedRDD = actualRDD.map(lambda x: ((x[0],x[1]),x[2]))
# Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
# RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
squaredErrorsRDD = (predictedReformattedRDD.join(actualReformattedRDD).map(lambda x: pow(x[1][0]-x[1][1],2)))
# Compute the total squared error - do not use collect()
totalError = squaredErrorsRDD.reduce(lambda a,b: a+b)
# Count the number of entries for which you computed the total squared error
numRatings = squaredErrorsRDD.count()
# Using the total squared error and the number of entries, compute the RSME
return math.sqrt(float(totalError)/numRatings )
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Create a function to compute the MAE error
def get_MAE(predictedRDD, actualRDD):
""" Compute the mean absolute error between predicted and actual
Args:
predictedRDD: predicted ratings for each movie and each user where each entry is in the form
(UserID, MovieID, Rating)
actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
Returns:
MAE (float): computed MAE value
"""
# Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
predictedReformattedRDD = # FILL IN
# Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
actualReformattedRDD = # FILL IN
# Compute the mean absolute error for each matching entry (i.e., the same (User ID, Movie ID) in each
# RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
AbsoluteErrorsRDD = # FILL IN
# Compute the total absolute error - do not use collect()
totalError = # FILL IN
# Count the number of entries for which you computed the total absolute error
numRatings = # FILL IN
# Using the total squared error and the number of entries, compute the MAE
return # FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_Predicted = sc.parallelize([(0, 0, 5), (0, 1, 3)])
check_Actual = sc.parallelize([(0, 0, 3), (0, 1, 2)])
Test.assertEquals(get_MAE(check_Predicted, check_Actual), 1.5, 'incorrect result: function get_MAE() is incorrect')
Now, let's evaluate the performance of the mean based baseline.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Compute the MAE error for each mean based baseline
MAE_mean = # FILL IN
# Compute the RMSE error for each mean based baseline
RMSE_mean = # FILL IN
print 'Mean model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE_mean, RMSE_mean)
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(np.round(MAE_mean,2), 0.83, 'incorrect result: MAE value of mean recommeder is incorrect')
Test.assertEquals(np.round(RMSE_mean,2), 1.04, 'incorrect result: RMSE value of mean recommeder is incorrect')
Now, let's work with the ALS algorithm. As you know, this method tries to approximate the ratings matrix by factorizing it as the product of two matrices:
$$ R = X * Y $$where $X$ describes properties of each user, and $Y$ describes properties of each item. These two matrices are known as latent factors, since they are a low-dimension representation of users and items.
If we examine the utilities of the MLLib, we can find a implementation of the ALS algorithm
. So, in this section, we will learn to use this module of the MLlib.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
from pyspark.mllib.recommendation import ALS
# Define parameters
n_latent_factors = 5
numIterations = 15
# Train the model (set seed=0L)
sc.setCheckpointDir('checkpoint/')
model = # FILL IN , seed=0L)
Once the model has been trained, let's make the recommendations. For this purpose, the ALS model has a method model.predictAll(testdata) which estimates the ratings over an RDD of ID pairs (userID, itemID).
So, complete the next cell to estimate the rating over the pairs of (users, items) of our test data.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Create a new RDD, RDD_test_ids, consisting of (UserID, MovieID) pairs
# that you extract from testRDD. That is, remove the filed Rating from testRDD.
RDD_test_ids = # FILL IN
# Estimate their ratings with model.predictAll( )
predictions = # FILL IN
# Print the first 10 predictions
predictions.take(10)
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_predictions = predictions.filter(lambda x: (x[0]==621) & (x[1]==68)).first()
Test.assertEquals(np.round(check_predictions[2],1), 3.7, 'incorrect result: predicted value is incorrect')
check_predictions = predictions.filter(lambda x: (x[0]==880) & (x[1]==8)).first()
Test.assertEquals(np.round(check_predictions[2],1), 4, 'incorrect result: predicted value is incorrect')
Note that, although each element of the RDD predictions is an object, you can extract the UserID, ItemID and predicted rating accessing to its first, second, and third element, respectively. See the example of the next cell...
In [ ]:
x = predictions.first()
print 'User ID: ' + str(x[0])
print 'Item ID: ' + str(x[1])
print 'Predicted rating: ' + str(x[2])
Advance work
Which ones are the 5 top ranked items for the user with id= 10?
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
user_id = 10
# Select the outputs of the user_id=10 (hint: filter method)
predictions_userid = # FILL IN
# Sort the outputs according to rating field (hint: sortBy method)
predictions_userid_sorted = # FILL IN
predictions_userid_sorted.take(5)
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_output = predictions_userid_sorted.map(lambda x:x[1]).take(5)
Test.assertEquals(check_output, [483, 127, 174, 701, 185], 'incorrect result: recommeded items are incorrect')
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# Compute the MAE error
MAE_als = # FILL IN
# Compute the RMSE error
RMSE_als = # FILL IN
print 'ALS model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE_als, RMSE_als)
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(np.round(MAE_als,2), 0.77, 'incorrect result: MAE value of ALS recommeder is incorrect')
Test.assertEquals(np.round(RMSE_als,2), 1.01, 'incorrect result: RMSE value of ALS recommeder is incorrect')
In this last section, we are going to implement a user-based collaborative filtering system on Spark.
As you know, the general algorithm has two steps:
To make easier this implementation, let's start precomputing all the similarities and we will use them in the second step.
In the next cell, you are given a function to compute the Pearson correlation coefficient defined as: $$ sim(user_a, user_b) = \frac{\sum_{p \in P} (r_{a,p} -\bar{r}_a)(r_{b,p} -\bar{r}_b)} {\sqrt{ \sum_{p \in P} (r_{a,p} -\bar{r}_a)^2} \sqrt{ \sum_{p \in P} (r_{b,p} -\bar{r}_b)^2}}$$
where $P$ is set of items rated for both users a and b, $r_{u,p}$ is the rating of the user u to item p, and $\bar{r}_u$ is the mean value of the all the ratings of the user u.
In [ ]:
def compute_Pearson_correlation(ratings_u1, ratings_u2, n_items_th = 1):
""" Calculate correlation coefficient
Args:
ratings_u1: Iduser, a pyspark iterable with tuples (item, rating) with all the ratings of user 1
ratings_u2: Iduser, a pyspark iterable with tuples (item, rating) with all the ratings of user 2
n_items_th: number of common items that both users have to be rated to compute its similarity.
If the users have less than n_items_th common rated items, its similarity is set to zero.
By default, n_items_th is set to 1.
Returns:
corr_value: correlation coefficient
"""
# Get the items and values rated by user 1
[items_u1, values_u1] = zip(*list(ratings_u1[1]))
# Get the items and values rated by user 2
[items_u2, values_u2] = zip(*list(ratings_u2[1]))
# Get the set of items rated by both users and their values
r_u1 = [values_u1[i] for i, item in enumerate(items_u1) if item in items_u2]
r_u2 = [values_u2[i] for i, item in enumerate(items_u2) if item in items_u1]
if len(r_u1)>= n_items_th: # If the are common rated items...
# Compute the means of the user ratings
m_1 = np.mean(np.array(values_u1))
m_2 = np.mean(np.array(values_u2))
# Remove their means
r_u1 = r_u1 - m_1
r_u2 = r_u2 - m_2
# Compute the correlation coefficient
corr_value = np.dot(r_u1,r_u2.T)/(np.sqrt(np.dot(r_u1,r_u1.T))*np.sqrt(np.dot(r_u2,r_u2.T)))
# Remove useless dimensions
corr_value =np.squeeze(corr_value)
else: # Else correlation is 0
corr_value = 0
# Checking that the correlation is not NaN (this would happen if the denominatior is 0),
# in this case, set the correlation coefficient to 0
if math.isnan(corr_value):
corr_value = 0
return corr_value
Now, complete the next cell to be able to evaluate the function compute_Pearson_correlation( ).
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# 1. From trainingRDD, create an RDD where each element is (userID, (ItemID, rating)), i.e,
# the userID is the key and the pair (ItemID, rating) is the value.
RDD_users_as_key = #FILL IN
# 2. Group the elements of RDD_users_as_key by key (see groupByKey() method)
# Each element of this new RDD is (userID, spark-iterable), where the spark iterable has
# a list with all the rated items elements (ItemID, rating)
RDD_users_ratings = #FILL IN
# 3. Extract the spark-iterable element with all the ratings of users 1 and 2
id_u1 = 1
ratings_u1 = #FILL IN
id_u2 = 2
ratings_u2 = #FILL IN
# 4. Compute its similarity
n_items_th = 4
compute_Pearson_correlation(ratings_u1, ratings_u2, n_items_th)
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(np.round(similarity,2), 0.80, 'incorrect result: similarity value is incorrect')
Once we can compute similarities between two users, let's compute, for each user, its distance with all the remaining users. The output of this cell will be an RDD of similarities where each element is (UserID, spark-iterable), where spark-iterable is a iterable list with pairs of (UserID, similarity).
Note that it is enough if this list only saves the users with a similarity larger than zero or larger than a given threshold.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# 1. From trainingRDD, create an new RDD with elements (userID, spark-iterable), where
# spark iterable has a list [(ItemID, rating), ...] with all the items rated by UserID
# (see previous section)
RDD_users_ratings = # FILL IN
# 2. Create all the combinations of pairs with the users (see cartesian method of RDD elements)
# Note that cartesian returns an RDD with elements ((id_1, iterable_ratings 1), (id_2, iterable_ratings 2))
pairs_users = # FILL IN
# 3. Compute correlation values with the function compute_Pearson_correlation()
n_items_th = 4
correlation_values = # FILL IN
# 4. Select correlation values larger than the similarity threshold (filter method)
sim_th = 0.2
correlation_values_sel = # FILL IN
# 5. Let's reorganize each element of the RDD to get user 1 as key and the tuple
# (user2, similarity) as value
all_correlations_with_userid = # FILL IN
# 6. Group the elements of all_correlations_with_userid by key (groupByKey() method)
# Each element of this new RDD is (userID, spark-iterable), where the spark iterable has
# a list with all the similar users (UserID, similarity)
RDD_sim_users = # FILL IN
RDD_sim_users.cache()
In [ ]:
###########################################################
# TEST CELL
###########################################################
id_user = 1
sim_user1 = RDD_sim_users.filter(lambda x : x[0]==id_user).first()
sim_check = sc.parallelize(list(sim_user1[1]))
Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 22).first()[1],2), 0.34, 'incorrect result: similarity value is incorrect')
Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 120).first()[1],2), 0.37, 'incorrect result: similarity value is incorrect')
Once you know how a user is similar to other users, you would like to know which items should be recommended for this user.
For this purpose, we have to assign a rating to each item by averaging the ratings that the similar users have given to that item according to this expression: $$ pred(user_a, item_i) = \bar{r_a} + \frac{\sum_{b \in N} sim(user_a, user_b) * (r_{b,i}- \bar{r_b})}{\sum_{b \in N} sim(user_a, user_b)}$$ where N is the number of neighbors of user a ($sim >sim_th$) which have rated item i.
Next cell contains the necessary code to compute the above expression given the average rating of a user. Review this function, paying special attention to the input parameters.
In [ ]:
def compute_predictions(med_user, list_sim, list_ratings):
""" Estimate the rating that a user u would assign over a item i
Args:
med_user: average rating of the user u
list_sim: list of tuples (id_user, similarity) with the users who are
similar to the user u and its similarity value
list_rantings: list of tuples (id_user, rating) with the ratings that the remaining
users have already assigned to the item i. Note that the rating values are normalized
(the average rating of the corresponding user has been previously subtracted so that
this function implements the above expression)
Returns:
pred_value: estimated rating for the user u to the item i
"""
if (list_sim is not None) & (list_ratings is not None):
dict1 = dict(list_sim)
dict2 = dict(list_ratings)
list_intersect = [(k, dict1[k], dict2[k]) for k in sorted(dict1) if k in dict2]
# We have build a list with: (user_id_similar, sim_value, rating_user_sim)
if len(list_intersect)>0:
aux = [(sim*rat, sim) for (id_user, sim, rat) in list_intersect]
numerator, denominator = zip(*aux)
pred_value = med_user + sum(numerator)/sum(denominator)
else:
pred_value = med_user
else:
pred_value = med_user
return pred_value
To obtain the predicted outputs for the test data and evaluate the performance of the user based recommender, we need to compute all the needed input arguments. Follow the steps of the next sections to obtain them.
1. Computing the average rating of each user
Please, review section baseline recommender (Subsection 2).
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# 1. From trainingRDD create a new RDD with the fields (user, rating), and convert it to
# (user, list_ratings). Hint: GroupByKey()
RDD_users_ratings = # FILL IN
# Convert this RDD (user, list_ratings) -> (user, mean_user). Use getAverages() function
RDD_users_mean = # FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
id_user = 1
mean_user1 = RDD_users_mean.filter(lambda x : x[0]==id_user).first()
Test.assertEquals(np.round(mean_user1[1],2), 3.6, 'incorrect result: mean rating value is incorrect')
2. Create a list of ratings
Here, you should create a new RDD with one element for each item, where each element is given by (item_id, list_ratings), where list of ratings has a set of tuples (user_id, rating) with the id of the user who has rated item_id and the assigned rating. Besides, the ratings of the list have to be normalized (subtracting its corresponding user average rating).
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# 2.1. Create an RDD with training ratings subtracting the users' mean
# Create an RDD with elements (user, (item, rating))
trainingRDD_aux = # FILL IN
# Combine it with the users_mean -> (user, ((item, rating), user_mean))
# Hint: leftOuterJoin()
trainingRDD_mean = # FILL IN
# Create a new RDD subtracting the mean of each rating and reorganize it -> (user, item, rating_norm)
trainingRDD_norm = # FILL IN
# 2.2. Create an RDD with normalized training ratings with the form (item, list((user, rating)))
RDD_tratings_item = # FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
id_item = 22
ratings_item = RDD_ratings_item.filter(lambda x : x[0]==id_item).first()
ratings_check = sc.parallelize(list(ratings_item[1]))
Test.assertEquals(np.round(ratings_check.filter(lambda x: x[0] == 608).first()[1],2), 0.26, 'incorrect result: rating value is incorrect')
Test.assertEquals(np.round(ratings_check.filter(lambda x: x[0] == 184).first()[1],2), -0.66, 'incorrect result: rating value is incorrect')
3. Combine previous RDDs
Until now, we have these RDDs:
To make the predictions over the test data we need for each pair (userID, itemID) of the test ratings, we need to build an element containing:
Then, here, we are going to combine the above RDD to create a new RDD with elements given by: ((userID, itemID), average_rating_userID, list_similar_users_to_userID, list_ratings_itemID)
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# 3.1 Create an input RDD, testForPredictingRDD, consisting of (UserID, MovieID) pairs
# that you extract from testRDD (i.e., remove the field rating)
RDD_test_ids = # FILL IN
# 3.2 Combine RDD_test_ids with RDD_users_mean to create an RDD (user, (item, mean_user))
# Hint: leftOuterJoin()
RDD_test_ids_mean = # FILL IN
# 3.3 Combine RDD_test_ids_mean with RDD_sim_users to create an RDD with elements
# (user, ((item, mean_user), list_sim_user)). Hint: leftOuterJoin()
# Next, reformat it to obtain elements (item, (user, mean_user, list_sim_user))
RDD_test_ids_sim = # FILL IN
# 3.4 Combine RDD_test_ids_sim with RDD_ratings_item to create an RDD with elements
# (item, ((user, mean_user , list_sim_user), list_item_rating)). Hint: leftOuterJoin()
# Next, reformat it to obatian elements ((user, item), mean_user, list_sim_user, list_item_rating)
RDD_test_ids_sim_rat = # FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
check_out = RDD_test_ids_sim_rat.filter(lambda x: x[0]==(218, 516)).first()
Test.assertEquals(np.round(check_out[1],2), 3.62, 'incorrect result: mean value of the RDD is incorrect')
sim_check = sc.parallelize(list(check_out[2]))
Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 24).first()[1],2), 0.31, 'incorrect result: similarity value is incorrect')
rating_check = sc.parallelize(list(check_out[3]))
Test.assertEquals(np.round(rating_check.filter(lambda x: x[0] == 308).first()[1],2), 0.23, 'incorrect result: rating value is incorrect')
4. Compute predictions
Complete the next cell to use RDD_test_ids_sim_rat elements as inputs of the function compute_predictions( ) and obtain the predicted ratings over the test data.
In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# For each element of RDD_test_ids_sim_rat call to compute_predictions and create a new RDD
# with elements ((user, item), predicted value)
RDD_outputs = # FILL IN
RDD_predictions = # FILL IN
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (840, 516)).first()[2],2), 4.8, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (174, 1032)).first()[2],2), 3.28, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (896, 12)).first()[2],2), 3.83, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (59, 528)).first()[2],2), 4.18, 'incorrect result: predicted value is incorrect')
5. Evaluate performance
In [ ]:
# Compute the error MAE
MAE = get_MAE(RDD_predictions, testRDD)
# Compute the error RMSE
RMSE = get_RMSE(RDD_predictions, testRDD)
print 'User based model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE, RMSE)
In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(np.round(MAE,2), 0.80, 'incorrect result: MAE value is incorrect')
Test.assertEquals(np.round(RMSE,2), 1.02, 'incorrect result: RMSE value is incorrect')